package com.migrate.module.migrate;

import cn.hutool.core.date.DateTime;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.migrate.module.config.ApplicationContextUtil;
import com.migrate.module.constants.Constants;
import com.migrate.module.domain.*;
import com.migrate.module.enumeration.EtlProgressStatus;
import com.migrate.module.lock.ScrollLock;
import com.migrate.module.mapper.migrate.MigrateScrollMapper;
import com.migrate.module.service.MigrateService;
import com.migrate.module.util.DateUtils;
import com.migrate.module.util.SnowflakeIdWorker;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * 全量滚动数据处理
 *
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class ScrollProcessor {

    /**
     * 抽取数据的大小
     */
    private static final Integer PAGE_SIZE = 500;
    /**
     * 滚动锁对象
     */
    private final ScrollLock lock = new ScrollLock();

    @Resource
    private MigrateScrollMapper migrateScrollMapper;

    /**
     * 全量同步数据入口(可指定从某个批次的序号开始再次执行)
     * @param rangeScroll 数据抽取模型
     */
    public void scroll(RangeScroll rangeScroll){
        boolean lockState = this.lock.lock(rangeScroll.getTableName());
        if (lockState){
            try {
                //初始化总数据量统计容器
                if (ObjectUtils.isEmpty(rangeScroll.getPageSize())){
                    rangeScroll.setPageSize(PAGE_SIZE);
                }
                if (ObjectUtils.isEmpty(rangeScroll.getTicket())){
                    rangeScroll.setTicket(SnowflakeIdWorker.getCode());
                }
                if (StringUtils.isEmpty(rangeScroll.getStartScrollId())) {
                    MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class);
                    String orderNo = migrateService.queryMinOrderNo(rangeScroll);
                    rangeScroll.setStartScrollId(orderNo);
                }
                EtlProgress etlProgress = addEtlProgress(rangeScroll);
                //对滚动的数据进行处理
                scrollDate(etlProgress,rangeScroll);
            }catch (Exception e){
                log.error("滚动拉取数据异常{}",JSONObject.toJSONString(rangeScroll),e);
            }finally {
                lock.unlock(rangeScroll.getTableName());
            }
        }
    }

    /**
     * 循环滚动数据
     * @param etlProgress 数据抽取批次
     */
    private void scrollDate(EtlProgress etlProgress,RangeScroll rangeScroll){
        EtlDirtyRecord etlDirtyRecord = null;
        try {
            MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class);
            // 滚动抽取数据  当抽取完最后一批数据后将同步状态为同步完成
            List<Map<String, Object>> queryInfoList = migrateService.queryInfoList(rangeScroll);
            while (CollectionUtils.isNotEmpty(queryInfoList)){
                //数据同步
                MergeBinlogWrite mergeBinlogWrite = new MergeBinlogWrite();
                // 拿当前的这批数据，标记最后一条数据的关键分页字段更新
                ScrollInfo scrollInfo = mergeBinlogWrite.load(queryInfoList, rangeScroll);
                //当批量写入的数据为0时，可能已经在时间范围内同步完成
                // 这个时候查询进度数据，如果进度达到100%，则更新当前的任务为完成状态
                if (checkEtlProgressSuccess(scrollInfo,etlProgress)){
                    //更新当前的同步任务为同步完成
                    updateEtlProgressSuccess(etlProgress);
                    return;
                }
                // 初始化本批次的明细数据
                etlDirtyRecord = insertEtlDirtyRecord(etlProgress,rangeScroll);
                
                rangeScroll.setStartScrollId(scrollInfo.getMaxScrollId());
                etlProgress.setScrollId(scrollInfo.getMaxScrollId());
                etlProgress.setCurTicketStage(etlProgress.getCurTicketStage()+1);
                etlDirtyRecord.setSyncSize(scrollInfo.getScrollSize());
                // 更新这个批次的数据同步完成
                updateEtlProgress(etlProgress,etlDirtyRecord, EtlProgressStatus.SUCCESS.getValue(),scrollInfo.getScrollSize());

                // 继续滚动抽取数据
                queryInfoList = migrateService.queryInfoList(rangeScroll);
            }
            updateEtlProgressSuccess(etlProgress);
        }catch (Exception e){
            log.error("循环滚动数据错误", e);
            if (null != etlProgress)
            {
                updateEtlProgressFail(etlProgress,etlDirtyRecord,0);
            }
        }
    }

    /**
     * 初始化记录本次迁移数据明细
     * @param rangeScroll 数据抽取模型
     * @return 迁移表记录
     */
    private EtlProgress addEtlProgress(RangeScroll rangeScroll){
        EtlProgress etlProgress = new EtlProgress();

        etlProgress.setLogicModel(rangeScroll.getTableName());
        if (ObjectUtils.isEmpty(rangeScroll.getCurTicketStage())){
            etlProgress.setCurTicketStage(1);
        } else {
            etlProgress.setCurTicketStage(rangeScroll.getCurTicketStage()+1);
        }
        etlProgress.setTicket(rangeScroll.getTicket());
        if (!ObjectUtils.isEmpty(rangeScroll.getRetryTimes())){
            etlProgress.setRetryTimes(rangeScroll.getRetryTimes());
        } else {
            etlProgress.setRetryTimes(0);
        }

        //断点续传的时候，同步finishRecord数据过来，然后将翻页的页数改为默认值，
        // 防止当同步了几百万数据时，一次取百万数据导致异常
        if (PAGE_SIZE.equals(rangeScroll.getPageSize())){
            etlProgress.setFinishRecord(0);
        }else{
            etlProgress.setFinishRecord(rangeScroll.getPageSize());
            rangeScroll.setPageSize(PAGE_SIZE);
        }

        etlProgress.setProgressType(0);
        etlProgress.setStatus(EtlProgressStatus.INIT.getValue());
        etlProgress.setScrollId(rangeScroll.getStartScrollId());
        etlProgress.setScrollTime(rangeScroll.getStartTime());
        etlProgress.setScrollEndTime(rangeScroll.getEndTime());
        etlProgress.setCreateTime(new DateTime());
        etlProgress.setUpdateTime(new DateTime());
        // 重试直接返回
        if (rangeScroll.getRetryFlag()){
            rangeScroll.setRetryFlag(false);
            return etlProgress;
        } else {
            migrateScrollMapper.insertEtlProgress(etlProgress);
        }
        return etlProgress;
    }

    /**
     * 新增数据迁移的进度明细信息
     * @param etlProgress
     */
    private EtlDirtyRecord insertEtlDirtyRecord(EtlProgress etlProgress,RangeScroll rangeScroll){
        EtlDirtyRecord etlDirtyRecord = new EtlDirtyRecord();
        etlDirtyRecord.setLogicModel(etlProgress.getLogicModel());
        etlDirtyRecord.setCurTicketStage(etlProgress.getCurTicketStage());
        etlDirtyRecord.setTicket(etlProgress.getTicket());
        etlDirtyRecord.setStatus(EtlProgressStatus.INIT.getValue());
        etlDirtyRecord.setRetryTimes(0);
        etlDirtyRecord.setSyncSize(rangeScroll.getPageSize());
        etlDirtyRecord.setRecordValue(etlProgress.getScrollId());
        etlDirtyRecord.setRecordKey(MergeConfig.getSingleKey(etlProgress.getLogicModel()));
        etlDirtyRecord.setCreateTime(new DateTime());
        etlDirtyRecord.setUpdateTime(new DateTime());
        migrateScrollMapper.insertEtlDirtyRecord(etlDirtyRecord);

        return etlDirtyRecord;
    }

    /**
     * 更新 本次迁移数据的状态为失败
     * @param etlProgress 迁移表记录
     */
    private void updateEtlProgressFail(EtlProgress etlProgress,EtlDirtyRecord etlDirtyRecord,Integer pageSize){
        etlProgress.setStatus(EtlProgressStatus.FAIL.getValue());
        updateEtlProgress(etlProgress, etlDirtyRecord, EtlProgressStatus.FAIL.getValue(), pageSize);
    }

    /**
     * 更新 本次迁移数据的状态
     * @param etlProgress 迁移表记录
     */
    private void updateEtlProgress(EtlProgress etlProgress,EtlDirtyRecord etlDirtyRecord,Integer status,Integer pageSize){
        etlProgress.setUpdateTime(new DateTime());
        etlProgress.setFinishRecord(etlProgress.getFinishRecord()+pageSize);
        etlProgress.setCurTicketStage(etlDirtyRecord.getCurTicketStage());
        etlProgress.setScrollId(etlProgress.getScrollId());
        migrateScrollMapper.updateEtlProgress(etlProgress);
        // 更新本次明细的状态
        updateEtlDirtyRecord(etlDirtyRecord,status);
    }

    /**
     * 更新 本次迁移数据的状态为成功
     * @param etlProgress 迁移表记录
     */
    public void updateEtlProgressSuccess(EtlProgress etlProgress){
        etlProgress.setUpdateTime(new DateTime());
        etlProgress.setStatus(EtlProgressStatus.SUCCESS.getValue());
        migrateScrollMapper.updateEtlProgress(etlProgress);
    }

    /**
     * 当批量写入的数据为0时，可能已经在时间范围内同步完成
     * 这个时候查询进度数据，如果进度达到100%，则更新当前的任务为完成状态
     * 缓存当前批次的数据总量
     * @param scrollInfo 滚动信息
     * @param etlProgress 迁移表记录
     */
    public boolean checkEtlProgressSuccess(ScrollInfo scrollInfo,EtlProgress etlProgress){
        if(scrollInfo.getScrollSize()==0){
            BigDecimal statisticalCount;
            if (Constants.statisticalCountMap.containsKey(etlProgress.getTicket())){
                // 获取已同步的数据（通过CountCacheTask分天统计计算的数据）
                statisticalCount = Constants.statisticalCountMap.get(etlProgress.getTicket());
            }else{
                EtlStatistical etlStatistical = new EtlStatistical();
                etlStatistical.setLogicModel(etlProgress.getLogicModel());
                etlStatistical.setStartTime(DateUtils.format(etlProgress.getScrollTime()) );
                etlStatistical.setEndTime(DateUtils.format(etlProgress.getScrollEndTime()));
                // 获取已同步的数据（通过CountCacheTask分天统计计算的数据）
                statisticalCount = migrateScrollMapper.getStatisticalCount(etlStatistical);
                Constants.statisticalCountMap.put(etlProgress.getTicket(),statisticalCount);
            }
            if (new BigDecimal(etlProgress.getFinishRecord().toString()).compareTo(statisticalCount)>=0){
                //更新当前的同步任务为同步完成
                updateEtlProgressSuccess(etlProgress);
                return true;
            }else{
                return false;
            }
        }
        return false;
    }

    /**
     *  更新本次迁移数据的明细状态
     * @param etlDirtyRecord
     * @param status
     */
    private void updateEtlDirtyRecord(EtlDirtyRecord etlDirtyRecord,Integer status){
        etlDirtyRecord.setUpdateTime(new DateTime());
        etlDirtyRecord.setStatus(status);
        etlDirtyRecord.setRetryTimes(etlDirtyRecord.getRetryTimes()+1);
        migrateScrollMapper.updateEtlDirtyRecord(etlDirtyRecord);
    }
}
